Scheduled App
What is a Scheduled App?
A scheduled app is an app that runs continuously on the server at a specified interval for the duration of the active data stream. Scheduled apps can be data time, natural time or depth based.
The app receives real time data in the form of events that includes records in the events. It receives those events whenever the data provider provides that data to Corva.
They are typically continuous data set to a specified interval, e.g 60 second data or five foot data.
The scheduled app receives corva#wits data for drilling time based, corva#wits-1ft data for drilling depth based, the corva#completion.wits data for completions, and corva#wireline.wits data for wireline.
Dev Center scheduled apps are built on AWS Lambda. To learn more about AWS Lambda functions please see Best practices for working with AWS Lambda functions.
When to build a Scheduled App?
Typical use cases for a scheduled app are when your business logic requires real-time data at regular intervals greater than 1 second. Because most uses of real-time data do not require fine granularity, the majority of real-time applications are expected to be scheduled apps.
I require real-time data
I require regularly sampled data with time interval > 1 second (Data Time Event)
I require regularly sampled data with depth interval > 1 foot (Depth Event)
I require a backend app to be invoked when new time based data arrives from the field in order to execute logic (Data Time Event)
I require a backend app to be invoked when new depth based data arrives from the field in order to execute logic (Depth Event)
I require a backend app to be invoked on a timer in order to execute logic (Natural Time Event)
I require the app to be attached to the asset stream(s)
I do not require the app to be invoked from a front-end app
I do not require the app to be triggered
If you require real-time data at one second or 1 foot intervals (or the next available measurement), you should use a Stream App instead.
What is the difference between a Data Time app and a Natural Time app?
Please review the Data Time vs Natural Time tutorial.
Where to find the data to build a Scheduled App?
The Corva Dev Center Dataset Explorer is an easy way to to see a list of Corva datasets and the data stored in each dataset. An additional option is to utilize Corva's API tools located in the Dev Center Intro page and make a GET API request to your desired dataset.
How to build a Scheduled App?
The following examples will demonstrate the simple building blocks of a scheduled application while utilizing all imported functionalities.
1. Data Time Event
The use case for the example below is the app needs to GET 3 minutes of data for Rate of Penetration (rop) from corva#wits. The app needs to calculate the mean rop value for 3 minutes (180 seconds), then POST the calculated value to a custom dataset named big-data-energy#example-scheduled-depth-app.
1.1 Follow the Getting Started Tutorial to install the prerequisites, create and copy app to your local machine.
1.2. Open and review the README.md file.
# Dev Center Python Polling Scheduler Data App
## Getting started
[Python SDK Documentation](https://corva-ai.github.io/python-sdk)
### 1. Install
You need to have `make` installed:
- Windows: [Windows Subsystem for Linux](https://docs.microsoft.com/en-us/windows/wsl/install)
- OS X: [Command Line Tools for Xcode](https://developer.apple.com/download/more/)
- Linux: refer to your distro-specific docs
To install dependencies run:
`make`
### 2. Run Tests
`make test`
### 3. Create app zip
`make bundle`
Example of a README.md file
1.3. Update to latest Python SDK version in requirements.txt file
corva-sdk==1.8.0
pytest==7.1.1
Example of a README.md file for a ScheduledDataTimeEvent app
1.3.1. In the command line run
pip install -U corva-sdk
1.3.2 Or within requirements.txt file set to latest sdk version e.g. corva-sdk==1.8.0
, then run
pip install -r requirements.txt
1.4. Optional: Install make dependency
1.4.1 Install make
make install
make all
1.5 Make adjustments to the manifest.json file
{
"format": 1,
"license": {
"type": "MIT",
"url": "https://www.oandgexample.com/license/"
},
"developer": {
"name": "O&G Company",
"identifier": "oandgc",
"authors": []
},
"application": {
"type": "scheduler",
"key": "big-data-energy.example_scheduler_data_time_app",
"visibility": "private",
"name": "Example Scheduler Data Time App",
"description": "This is the description of my app. You can do great things with it!",
"summary": "More information about this app goes here",
"category": "analytics",
"website": "https://www.oandgexample.com/my-app/",
"segments": [
"drilling"
]
},
"settings": {
"entrypoint": {
"file": "lambda_function",
"function": "lambda_handler"
},
"timeout": 120,
"memory": 128,
"environment": {},
"runtime": "python3.8",
"app": {
"scheduler_type": 2,
"cron_string": "*/3 * * * *"
}
},
"datasets": {
"big-data-energy.example-scheduled-data-time-app": {
"permissions": [
"read",
"write"
]
},
"corva.wits": {
"permissions": [
"read"
]
}
}
}
Example of a manifest.json file for a ScheduledDataTimeEvent app
1.5.1 Set the time interval in the manifest.json file
The time interval is set via the "cron_string" within the "settings.app" object. The minimum time interval that can be set is 1 minute. See the following example of setting the time interval to 3 minutes data time:
"app": {
"scheduler_type": 2,
"cron_string": "*/3 * * * *"
}
The maximum time interval that can be set is once per day. See the following example of setting the time interval to once per day:
"app": {
"scheduler_type": 2,
"cron_string": "0 0 * * *"
}
Example of "cron_string" in the manifest.json file for a ScheduledDataTimeEvent app
Note: A free tool for determining the "cron_string" is the crontab guru.
1.5.2 Set read and/or write permissions to datasets in the manifest.json file
Read and write permissions can only be granted to company datasets (non-Corva), e.g. big-data-energy#example-scheduled-data-time-app. Read only permissions can be granted to Corva datasets, e.g. corva#wits. The example below shows how to grant read and write permissions to big-data-energy#example-scheduled-data-time-app and read permissions to corva#wits.
"datasets": {
"big-data-energy.example-scheduled-data-time-app": {
"permissions": [
"read",
"write"
]
},
"corva.wits": {
"permissions": [
"read"
]
}
Example of dataset read/write permissions in the manifest.json file for a ScheduledDataTimeEvent app
1.5.3 Optional: Set Python Log Levels in environment variables in the manifest.json file
The default Log Level in the Corva SDK is Logger.info(). In order to change the default Log Level, you must set the log level in the environment variables within the manifest.json file. The example below shows how to change the default Log Level to Logger.debug().
"settings": {
"entrypoint": {
"file": "lambda_function",
"function": "lambda_handler"
},
"timeout": 120,
"memory": 128,
"environment": {"LOG_LEVEL": "DEBUG" },
"runtime": "python3.8",
"app": {
"scheduler_type": 2,
"cron_string": "*/3 * * * *"
}
},
Example of Python log levels environment variable setting in the manifest.json file for a ScheduledDataTimeEvent app
Note: Please refer to Python's Logging Documentation to learn about the different log levels here Logging HOWTO and the Corva Logging Documentation.
1.5.4 Optional: Increase the application timeout time in the manifest.json file
If your app is running longer than the desired timeout value, then you may need to make sure that the Lambda function is idempotent. If increasing the app timeout value is required, then you may need to increase the timeout value. The default timeout value for an application is 120 seconds. The maximum value for the application timeout is 900 seconds. The example below shows the application timeout increased to 240 seconds.
"settings": {
"entrypoint": {
"file": "lambda_function",
"function": "lambda_handler"
},
"timeout": 240,
"memory": 128,
"environment": {"LOG_LEVEL": "DEBUG" },
"runtime": "python3.8",
"app": {
"scheduler_type": 2,
"cron_string": "*/3 * * * *"
}
},
Example of app timeout setting in the manifest.json file for a ScheduledDataTimeEvent app
Note: Please refer to AWS Lambda's Documentation to learn about lambda function best practices here Best practices for working with AWS Lambda functions.
1.5.5 Optional: Increase the application memory in the manifest.json file
If your app is importing large code libraries, completing memory intensive tasks, or is running much slower than expected, then you may need to increase the memory setting. The default memory value and minimum value for an application is 128 MB. The maximum value for the application memory is 10,240 MB. The example below shows the application timeout increased by 128 MB increments to 640 MB.
"settings": {
"entrypoint": {
"file": "lambda_function",
"function": "lambda_handler"
},
"timeout": 120,
"memory": 640,
"environment": {"LOG_LEVEL": "DEBUG" },
"runtime": "python3.8",
"app": {
"scheduler_type": 2,
"cron_string": "*/3 * * * *"
}
},
Example of app memory setting in the manifest.json file for a ScheduledDataTimeEvent app
Note: Please refer to AWS Lambda's Documentation to learn about lambda function best practices here Best practices for working with AWS Lambda functions.
1.6 Implement logic in the lambda_function.py file
Now that the app is configured, you can now implement the logic in the lambda_function.py file.
Note: Implementing the logic in the lambda_function.py file is the most basic way to implement the logic. The user has the option to create directories and use Python libraries like Pydantic.
# 1. Import required functionality.
from corva import Api, Cache, Logger, ScheduledDataTimeEvent, scheduled
import statistics
# 2. - Decorate your function using @scheduled. Use the the existing lambda_handler function or define your own function. It must receive three argumets: event, api and cache. The arguments serve as building blocks for your app.
@scheduled
def lambda_handler(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
# 3. Here is where you can declare your variables from the argument event: ScheduledDataTimeEvent and start using Api, Cache and Logger functionalities.
# The scheduled app can declare the following attributes from the ScheduledDataTimeEvent: company_id: The company identifier; asset_id: The asset identifier; start_time: The start time of interval; end_time: The end time of interval
asset_id = event.asset_id
company_id = event.company_id
start_time = event.start_time
end_time = event.end_time
# 4. Utilize the attributes from the ScheduledDataTimeEvent to make an API request to corva#wits or any desired time type dataset.
# You have to fetch the realtime drilling data for the asset based on start and end time of the event.
# start_time and end_time are inclusive so the query is structured accordingly to avoid processing duplicate data
# We are only querying for rop field since that is the only field we need. It is nested under data. We are using the SDK convenience method api.get_dataset. See API Section for more information on convenience method.
records = api.get_dataset(
provider="corva",
dataset= "wits",
query={
'asset_id': asset_id,
'timestamp': {
'$gte': start_time,
'$lte': end_time,
}
},
sort={'timestamp': 1},
limit=500,
fields="data.rop"
)
record_count = len(records)
# Utilize the Logger functionality. The default log level is Logger.info. To use a different log level, the log level must be specified in the manifest.json file in the "settings.environment": {"LOG_LEVEL": "DEBUG"}. See the Logger documentation for more information.
Logger.debug(f"{asset_id=} {company_id=}")
Logger.debug(f"{start_time=} {end_time=} {record_count=}")
# 5. Implementing some calculations
# Computing mean rop value from the list of realtime wits records
rop = statistics.mean(record.get("data", {}).get("rop", 0) for record in records)
# Utililize the Cache functionality to get a set key value. The Cache functionality is built on Redis Cache. See the Cache documentation for more information.
# Getting last exported timestamp from Cache
last_exported_timestamp = int(cache.get(key='last_exported_timestamp') or 0)
# Making sure we are not processing duplicate data
if end_time <= last_exported_timestamp:
Logger.debug(f"Already processed data until {last_exported_timestamp=}")
return None
# 6. This is how to set up a body of a POST request to store the mean rop data and the start_time and end_time of the interval from the event.
output = {
"timestamp": record.timestamp,
"asset_id": asset_id,
"company_id": company_id,
"provider": "big-data-energy",
"collection": "example-scheduled-data-time-app",
"data": {
"rop": rop,
"start_time": start_time,
"end_time": end_time
},
"version": 1
}
# Utilize the Logger functionality.
Logger.debug(f"{asset_id=} {company_id=}")
Logger.debug(f"{start_time=} {end_time=} {record_count=}")
Logger.debug(f"{output=}")
# 7. Save the newly calculated data in a custom dataset
# Utilize the Api functionality. The data=outputs needs to be an an array because Corva's data is saved as an array of objects. Objects being records. See the Api documentation for more information.
api.post(
f"api/v1/data/big-data-energy/example-scheduled-data-time-app/", data=[outputs],
).raise_for_status()
# Utililize the Cache functionality to set a key value. The Cache functionality is built on Redis Cache. See the Cache documentation for more information. This example is setting the last timestamp of the output to Cache
cache.set(key='last_exported_timestamp', value=outputs[-1].get("timestamp"))
return output
1.7 Locally test your application
1.7.1 Running a local test for your Corva App
To locally test your Corva app, you need to follow these steps:
Create a
local_run.py
file in your project directory.- This file will contain the code that simulates the environment in which your app will run.
- It uses environment variables for authentication and API access.
Set environment variables on your local machine.
Ensure you have set the appropriate environment variables, as shown in the code below.
Here's an example of how to export environment variables in your terminal:
export API_ROOT_URL="https://api.example.com"
export DATA_API_ROOT_URL="https://data-api.example.com"
export CORVA_API_KEY="your_api_key"
export APP_KEY="your_app_key"You can add these export statements to your shell profile (e.g.,
.bashrc
,.zshrc
) for persistence, or run them directly in your terminal for the current session.
Run the
local_run.py
file.Once you've created the file and set your environment variables, run the script using Python to simulate the app behavior in a local environment.
Example:
python local_run.py
Interpret the output.
- The output of your app will be printed in the terminal. You can use this output to verify the results of your function's execution in a local testing scenario.
Here’s an example of what the local_run.py
file should look like:
from corva import Api, Cache, Logger, ScheduledDataTimeEvent
from src.app import example_scheduled_data_time_app
# Main entry point for the script
if __name__ == '__main__':
import os # Import the os module to interact with the environment variables
# Define test values for the asset and company IDs, and the start/end times for the event
asset_id = 82891865 # Example asset ID for testing
company_id = 196 # Example company ID for testing
start_time="1521425191" # Unix timestamp representing the start time
end_time="1521425191" # Unix timestamp representing the end time
# Create an instance of ScheduledDataTimeEvent to mock the event object
# which would normally be provided during execution in a live environment
_event = ScheduledDataTimeEvent(asset_id=asset_id, company_id=company_id, start_time=start_time, end_time=end_time)
# Define a custom BearerApi class that extends the base Api class
# and automatically adds authentication headers (Bearer token and app key) to requests
class BearerApi(Api):
@property
def default_headers(self):
# The default headers to be sent with each API request
return {
'Authorization': f'{self.api_key}', # Bearer token for authentication
'X-Corva-App': self.app_key, # App key header specific to Corva apps
}
# Instantiate the BearerApi class using environment variables for API and app keys
api = BearerApi(
api_url=os.environ['API_ROOT_URL'], # Base URL for the Corva API
data_api_url=os.environ['DATA_API_ROOT_URL'], # Base URL for the data API
api_key=os.environ['CORVA_API_KEY'], # API key for authentication (from environment)
app_key=os.environ['APP_KEY'], # App key (from environment)
)
# Initialize the cache, pointing to a Redis instance running locally on default port 6379
cache = Cache(hash_name='cache_hash_name', redis_dsn='redis://127.0.0.1:6379/0')
# Call the example app function with the mocked event, API instance, and cache object
# The example_scheduled_data_time_app is the main business logic that is being tested here
output = example_scheduled_data_time_app(event=_event, api=api, cache=cache)
# Print the output to verify the result of the function call in the local environment
print(output)
1.8 Deploy your application
Please see Getting Started section 4. Upload and Publish.
1.8.1 App Runner production testing
Please see App Runner section for more information.
1.9 Provision the application
For Company provisioning please see App Provisioning for more information.
For Corva Partners please see App Provisioning with a focus on App Purchases.
2. Depth Event
The use case for the example below is the app needs to GET 10 feet of data for the dep field from corva#drilling.wits.depth. The app needs to calculate the mean dep value for every 10 feet, then POST the mean dep value to a custom dataset named big-data-energy#example-scheduled-depth-app.
NOTE: To view the depth stream names and the depth stream's log identifier please make the following query:
https://api.corva.ai/v1/app_streams?company_id={companyId}&asset_id={assetId}&status=active&log_type=depth
App stream API request response example:
[
{
"id":44918,
"company_id":80,
"asset_id":72643645,
"name":"App Stream Example Well #1",
"configuration":{
},
"status":"active",
"visibility":"visible",
"segment":"drilling",
"source_type":"drilling",
"log_type":"depth",
"log_identifier":"16a57de71bb2",
"log_display_name":"Downhole_Depth",
"settings":{
"logIds":[
"6ebb880e-b2bc-3b3a-a11f-56b2deb0dbcf"
]
}
},
{
"id":44917,
"company_id":80,
"asset_id":72643645,
"name":"App Stream Example Well #1",
"configuration":{
},
"status":"active",
"visibility":"visible",
"segment":"drilling",
"source_type":"drilling",
"log_type":"depth",
"log_identifier":"222f83a8ad8b",
"log_display_name":"Surface_Depth",
"settings":{
"logIds":[
"8bc9832d-be1c-389b-aac2-f6e636b97ba3"
]
}
}
]
2.1 Follow the Getting Started Tutorial to install the prerequisites, create and copy app to your local machine.
2.2. Open and review the README.md file.
# Dev Center Python Polling Scheduler Data App
## Getting started
[Python SDK Documentation](https://corva-ai.github.io/python-sdk)
### 1. Install
You need to have `make` installed:
- Windows: [Windows Subsystem for Linux](https://docs.microsoft.com/en-us/windows/wsl/install)
- OS X: [Command Line Tools for Xcode](https://developer.apple.com/download/more/)
- Linux: refer to your distro-specific docs
To install dependencies run:
`make`
### 2. Run Tests
`make test`
### 3. Create app zip
`make bundle`
Example of a README.md file
2.3. Update to latest Python SDK version in requirements.txt file
corva-sdk==1.8.0
pytest==7.1.1
Example of a README.md file for a ScheduledDepthEvent app
2.3.1. In the command line run
pip install -U corva-sdk
2.3.2 Or within requirements.txt file set to latest sdk version e.g. corva-sdk==1.8.0
, then run
pip install -r requirements.txt
2.4. Optional: Install make dependency
2.4.1 Install make
make install
make all
2.5 Make adjustments to the manifest.json file
{
"format": 1,
"license": {
"type": "MIT",
"url": "https://www.oandgexample.com/license/"
},
"developer": {
"name": "O&G Company",
"identifier": "oandgc",
"authors": []
},
"application": {
"type": "scheduler",
"key": "big-data-energy.example_scheduler_depth_app",
"visibility": "private",
"name": "Example Scheduler Depth App",
"description": "This is the description of my app. You can do great things with it!",
"summary": "More information about this app goes here",
"category": "analytics",
"website": "https://www.oandgexample.com/my-app/",
"segments": [
"drilling"
]
},
"settings": {
"entrypoint": {
"file": "lambda_function",
"function": "lambda_handler"
},
"timeout": 120,
"memory": 128,
"environment": {"LOG_LEVEL": "DEBUG" },
"runtime": "python3.8",
"app": {
"scheduler_type": 4,
"depth_milestone": 10
}
},
"datasets": {
"big-data-energy.example-scheduled-depth-app": {
"permissions": [
"read",
"write"
]
},
"corva.drilling.wits.depth": {
"permissions": [
"read"
]
}
}
}
Example of a manifest.json file for a ScheduledDepthEvent app
2.5.1 Set the depth interval in the manifest.json file
The depth interval is set via the "depth_milestone" within the "settings.app" object. The minimum depth interval that can be set is 1 foot. There is no maximum depth interval. See the following example of setting the depth interval to 10 feet (ft):
"app": {
"scheduler_type": 4,
"depth_milestone": 10
}
Here is how the depth milestone works for every 10ft:
1st batch of incoming data: 20ft - 40ft
Events created:
- top_depth: 15ft, bottom_depth: 25 ft (depth: 20 ft)
- top_depth: 25ft, bottom_depth: 35 ft (depth: 30 ft)
- (no event for depth 40ft because 45ft was not reached)
2nd batch of incoming data: 40.1ft - 55ft:
Events created:
- top_depth: 35ft, bottom_depth: 45ft (depth: 40ft)
- top_depth: 45ft, bottom_depth: 55ft (depth: 50ft)
2.5.2 Set read and/or write permissions to datasets in the manifest.json file
Read and write permissions can only be given to company datasets (non-Corva), e.g. big-data-energy#example-scheduled-depth-app. Read permissions can be granted to Corva datasets, e.g. corva#drilling.wits.depth. The example below shows how to grant read and write permissions to big-data-energy#example-scheduled-depth-app and read permissions to corva#drilling.wits.depth.
"datasets": {
"big-data-energy.example-scheduled-depth-app": {
"permissions": [
"read",
"write"
]
},
"corva.drilling.wits.depth.depth": {
"permissions": [
"read"
]
}
}
Example of dataset read/write permissions in the manifest.json file for a ScheduledDepthEvent app
2.5.3 Optional: Set Python Log Levels in environment variables in the manifest.json file
The default Log Level in the Corva SDK is Logger.info(). In order to change the default Log Level, you must set the log level in the environment variables within the manifest.json file. The example below shows how to change the default Log Level to Logger.debug().
"settings": {
"entrypoint": {
"file": "lambda_function",
"function": "lambda_handler"
},
"timeout": 120,
"memory": 128,
"environment": {"LOG_LEVEL": "DEBUG" },
"runtime": "python3.8",
"app": {
"scheduler_type": 4,
"depth_milestone": 10
}
}
Example of Python log levels environment variable setting in the manifest.json file for a ScheduledDepthEvent app
Note: Please refer to Python's Logging Documentation to learn about the different log levels here Logging HOWTO and the Corva Logging Documentation.
2.5.4 Optional: Increase the application timeout time in the manifest.json file
If your app is running longer than the desired timeout value, then you may need to make sure that the Lambda function is idempotent. If increasing the app timeout value is required, then you may need to increase the timeout value. The default timeout value for an application is 120 seconds. The maximum value for the application timeout is 900 seconds. The example below shows the application timeout increased to 240 seconds.
"settings": {
"entrypoint": {
"file": "lambda_function",
"function": "lambda_handler"
},
"timeout": 240,
"memory": 128,
"environment": {"LOG_LEVEL": "DEBUG" },
"runtime": "python3.8",
"app": {
"scheduler_type": 4,
"depth_milestone": 10
}
},
Example of app timeout setting in the manifest.json file for a ScheduledDepthEvent app
Note: Please refer to AWS Lambda's Documentation to learn about lambda function best practices here Best practices for working with AWS Lambda functions.
2.5.5 Optional: Increase the application memory in the manifest.json file
If your app is importing large code libraries, completing memory intensive tasks, or is running much slower than expected, then you may need to increase the memory setting. The default memory value and minimum value for an application is 128 MB. The maximum value for the application memory is 10,240 MB. The example below shows the application timeout increased by 128 MB increments to 640 MB.
"settings": {
"entrypoint": {
"file": "lambda_function",
"function": "lambda_handler"
},
"timeout": 120,
"memory": 640,
"environment": {"LOG_LEVEL": "DEBUG" },
"runtime": "python3.8",
"app": {
"scheduler_type": 4,
"depth_milestone": 10
}
},
Example of app memory setting in the manifest.json file for a ScheduledDepthEvent app
Note: Please refer to AWS Lambda's Documentation to learn about lambda function best practices here Best practices for working with AWS Lambda functions.
2.6 Implement logic in the lambda_function.py file
Now that the app is configured, you can now implement the logic in the lambda_function.py file.
Note: Implementing the logic in the lambda_function.py file is the most basic way to implement the logic. The user has the option to create directories and use Python libraries like Pydantic.
# 1. Import required functionality.
from corva import Api, Cache, Logger, ScheduledDepthEvent, scheduled
import statistics
# 2. - Decorate your function using @scheduled. Use the the existing lambda_handler function or define your own function. It must receive three argumets: event, api and cache. The arguments serve as building blocks for your app.
@scheduled
def lambda_handler(event: ScheduledDepthEvent, api: Api, cache: Cache):
# 3. Here is where you can declare your variables from the argument event: ScheduledDepthEvent and start using Api, Cache and Logger functionalities.
# The scheduled app can declare the following attributes from the ScheduledDepthEvent: asset_id: The asset identifier; company_id: The company identifier; top_depth: The start depth in ft.; bottom_depth: The end depth in ft.; interval: distance between two schedule triggers; log_identifier: app stream log identifier. Used to scope data by stream. The asset may be connected to multiple depth based logs.
asset_id = event.asset_id
company_id = event.company_id
log_identifier = event.log_identifier
top_depth = event.top_depth
bottom_depth = event.bottom_depth
interval = event.interval
# 4. Utilize the attributes from the ScheduledDepthEvent to make an API request to corva#drilling.wits.depth or any desired depth type dataset.
# You have to fetch the realtime drilling data for the asset based on start and end time of the event.
# start_time and end_time are inclusive so the query is structured accordingly to avoid processing duplicate data
# We are only querying for rop field since that is the only field we need. It is nested under data. We are using the SDK convenience method api.get_dataset. See API Section for more information on convenience method.
records = api.get_dataset(
provider="corva",
dataset= "drilling.wits.depth",
query={
'asset_id': asset_id,
'log_identifier': log_identifier,
'measured_depth': {
'$gte': top_depth,
'$lte': bottom_depth,
},
},
sort={'measured_depth': 1},
limit=500,
fields="data.rop"
)
record_count = len(records)
# Utilize the Logger functionality. The default log level is Logger.info. To use a different log level, the log level must be specified in the manifest.json file in the "settings.environment": {"LOG_LEVEL": "DEBUG"}. See the Logger documentation for more information.
Logger.debug(f"{asset_id=} {company_id=}")
Logger.debug(f"{top_depth=} {bottom_depth=} {record_count=}")
# 5. Implementing some calculations
# Computing mean dep value from the list of realtime drilling.wits.depth records
mean_dep = statistics.mean(record.get("data", {}).get("dep", 0) for record in records)
# Utililize the Cache functionality to get a set key value. The Cache functionality is built on Redis Cache. See the Cache documentation for more information.
# Getting last exported depth from Cache
last_exported_measured_depth = int(cache.get(key='measured_depth') or 0)
# Making sure we are not processing duplicate data
if bottom_depth <= last_exported_measured_depth:
Logger.debug(f"Already processed data until {last_exported_measured_depth=}")
return None
# 6. This is how to set up a body of a POST request to store the mean dep data and the top_depth and bottom_depth of the interval from the event.
output = {
"measured_depth": record.measured_depth,
"asset_id": asset_id,
"company_id": company_id,
"provider": "big-data-energy",
"collection": "example-scheduled-depth-app",
"log_identifier": log_identifier,
"data": {
"mean_dep": mean_dep,
"top_depth": top_depth,
"bottom_depth": bottom_depth
},
"version": 1
}
# Utilize the Logger functionality.
Logger.debug(f"{asset_id=} {company_id=}")
Logger.debug(f"{top_depth=} {bottom_depth=} {record_count=}")
Logger.debug(f"{output=}")
# 7. Save the newly calculated data in a custom dataset
# Utilize the Api functionality. The data=outputs needs to be an an array because Corva's data is saved as an array of objects. Objects being records. See the Api documentation for more information.
api.post(
f"api/v1/data/big-data-energy/example-scheduled-depth-app/", data=[output],
).raise_for_status()
# Utililize the Cache functionality to set a key value. The Cache functionality is built on Redis Cache. See the Cache documentation for more information. This example is setting the last measured_depth of the output to Cache
cache.set(key='measured_depth', value=output[-1].get("measured_depth"))
return output
2.7 Locally test your application
To locally test your Corva app, you need to follow these steps:
Create a
local_run.py
file in your project directory.- This file will contain the code that simulates the environment in which your app will run.
- It uses environment variables for authentication and API access.
Set environment variables on your local machine.
Ensure you have set the appropriate environment variables, as shown in the code below.
Here's an example of how to export environment variables in your terminal:
export API_ROOT_URL="https://api.example.com"
export DATA_API_ROOT_URL="https://data-api.example.com"
export CORVA_API_KEY="your_api_key"
export APP_KEY="your_app_key"You can add these export statements to your shell profile (e.g.,
.bashrc
,.zshrc
) for persistence, or run them directly in your terminal for the current session.
Run the
local_run.py
file.Once you've created the file and set your environment variables, run the script using Python to simulate the app behavior in a local environment.
Example:
python local_run.py
Interpret the output.
- The output of your app will be printed in the terminal. You can use this output to verify the results of your function's execution in a local testing scenario.
Here’s an example of what the local_run.py
file should look like:
from corva import Api, Cache, Logger, ScheduledDepthEvent
from src.app import example_scheduled_depth_app
# Main entry point for the script
if __name__ == '__main__':
import os # Import the os module to interact with the environment variables
# Define test values for the asset and company IDs, and the start/end times for the event
asset_id = 82891865 # Example asset ID for testing
company_id = 196 # Example company ID for testing
top_depth = 1000 # Example top depth value for testing
bottom_depth = 2000 # Example bottom depth value for testing
interval = 1 # Example interval value for testing
log_identifier = 1 # Example log identifier for testing
# Create an instance of ScheduledDataTimeEvent to mock the event object
# which would normally be provided during execution in a live environment
_event = ScheduledDepthEvent(asset_id=asset_id, company_id=company_id, interval=interval, top_depth=top_depth, bottom_depth=bottom_depth, log_identifier=log_identifier)
# Define a custom BearerApi class that extends the base Api class
# and automatically adds authentication headers (Bearer token and app key) to requests
class BearerApi(Api):
@property
def default_headers(self):
# The default headers to be sent with each API request
return {
'Authorization': f'{self.api_key}', # Bearer token for authentication
'X-Corva-App': self.app_key, # App key header specific to Corva apps
}
# Instantiate the BearerApi class using environment variables for API and app keys
api = BearerApi(
api_url=os.environ['API_ROOT_URL'], # Base URL for the Corva API
data_api_url=os.environ['DATA_API_ROOT_URL'], # Base URL for the data API
api_key=os.environ['CORVA_API_KEY'], # API key for authentication (from environment)
app_key=os.environ['APP_KEY'], # App key (from environment)
)
# Initialize the cache, pointing to a Redis instance running locally on default port 6379
cache = Cache(hash_name='cache_hash_name', redis_dsn='redis://127.0.0.1:6379/0')
# Call the example app function with the mocked event, API instance, and cache object
# The example_scheduled_data_time_app is the main business logic that is being tested here
output = example_scheduled_depth_app(event=_event, api=api, cache=cache)
# Print the output to verify the result of the function call in the local environment
print(output)
2.8 Deploy your application
Please see Getting Started section 4. Upload and Publish.
2.8.1 App Runner production testing
Please see App Runner section for more information.
2.9 Provision the application
For Company provisioning please see App Provisioning for more information.
For Corva Partners please see App Provisioning with a focus on App Purchases.
3. Natural Time Event
The use case for the example below is the app needs to invoke once every 5 minutes without interruption. The app needs to GET 5 minutes of data for the drilling mud density from corva#hydraulics.surge-and-swab. The app needs to calculate the mean mud density value every 5 minutes, whether data is available or not, then POST the calculated value to a custom dataset named big-data-energy#example-scheduled-natural-time-app.
3.1 Follow the Getting Started Tutorial to install the prerequisites, create and copy app to your local machine.
3.2. Open and review the README.md file.
# Dev Center Python Polling Scheduler Data App
## Getting started
[Python SDK Documentation](https://corva-ai.github.io/python-sdk)
### 1. Install
You need to have `make` installed:
- Windows: [Windows Subsystem for Linux](https://docs.microsoft.com/en-us/windows/wsl/install)
- OS X: [Command Line Tools for Xcode](https://developer.apple.com/download/more/)
- Linux: refer to your distro-specific docs
To install dependencies run:
`make`
### 2. Run Tests
`make test`
### 3. Create app zip
`make bundle`
Example of a README.md file
3.3. Update to latest Python SDK version in requirements.txt file
corva-sdk==1.8.0
pytest==7.1.1
Example of a README.md file for a ScheduledDataTimeEvent app
3.3.1. In the command line run
pip install -U corva-sdk
3.3.2 Or within requirements.txt file set to latest sdk version e.g. corva-sdk==1.8.0
, then run
pip install -r requirements.txt
3.4. Optional: Install make dependency
3.4.1 Install make
make install
make all
3.5 Make adjustments to the manifest.json file
{
"format": 1,
"license": {
"type": "MIT",
"url": "https://www.oandgexample.com/license/"
},
"developer": {
"name": "O&G Company",
"identifier": "oandgc",
"authors": []
},
"application": {
"type": "scheduler",
"key": "big-data-energy.example_scheduler_natural_time_app",
"visibility": "private",
"name": "Example Scheduler Natural Time App",
"description": "This is the description of my app. You can do great things with it!",
"summary": "More information about this app goes here",
"category": "analytics",
"website": "https://www.oandgexample.com/my-app/",
"segments": [
"drilling"
]
},
"settings": {
"entrypoint": {
"file": "lambda_function",
"function": "lambda_handler"
},
"timeout": 120,
"memory": 128,
"environment": {},
"runtime": "python3.8",
"app": {
"scheduler_type": 1,
"cron_string": "*/5 * * * *"
}
},
"datasets": {
"big-data-energy.example-scheduled-natural-time-app": {
"permissions": [
"read",
"write"
]
},
"corva.hydraulics.surge-and-swab": {
"permissions": [
"read"
]
}
}
}
Example of a manifest.json file for a ScheduledNaturalTimeEvent app
3.5.1 Set the time interval in the manifest.json file
The time interval is set via the "cron_string" within the "settings.app" object. The minimum time interval that can be set is 1 minute. See the following example of setting the time interval to 5 minutes clock time:
"app": {
"scheduler_type": 1,
"cron_string": "*/5 * * * *"
}
The maximum time interval that can be set is once per day. See the following example of setting the time interval to once per day:
"app": {
"scheduler_type": 1,
"cron_string": "0 0 * * *"
}
Example of "cron_string" in the manifest.json file for a ScheduledNaturalTimeEvent app
Note: A free tool for determining the "cron_string" is the crontab guru.
3.5.2 Set read and/or write permissions to datasets in the manifest.json file
Read and write permissions can only be granted to company datasets (non-Corva), e.g. big-data-energy#example_scheduler_natural_time_app. Read only permissions can be granted to Corva datasets, e.g. corva#hydraulics.surge-and-swab. The example below shows how to grant read and write permissions to big-data-energy#example_scheduler_natural_time_app and read permissions to corva#hydraulics.surge-and-swab.
"datasets": {
"big-data-energy.example_scheduler_natural_time_app": {
"permissions": [
"read",
"write"
]
},
"corva.hydraulics.surge-and-swab": {
"permissions": [
"read"
]
}
}
Example of dataset read/write permissions in the manifest.json file for a ScheduledNaturalTimeEvent app
3.5.3 Optional: Set Python Log Levels in environment variables in the manifest.json file
The default Log Level in the Corva SDK is Logger.info(). In order to change the default Log Level, you must set the log level in the environment variables within the manifest.json file. The example below shows how to change the default Log Level to Logger.debug().
"settings": {
"entrypoint": {
"file": "lambda_function",
"function": "lambda_handler"
},
"timeout": 120,
"memory": 128,
"environment": {"LOG_LEVEL": "DEBUG" },
"runtime": "python3.8",
"app": {
"scheduler_type": 1,
"cron_string": "*/5 * * * *"
}
},
Example of Python log levels environment variable setting in the manifest.json file for a ScheduledNaturalTimeEvent app
Note: Please refer to Python's Logging Documentation to learn about the different log levels here Logging HOWTO and the Corva Logging Documentation.
3.5.4 Optional: Increase the application timeout time in the manifest.json file
If your app is running longer than the desired timeout value, then you may need to make sure that the Lambda function is idempotent. If increasing the app timeout value is required, then you may need to increase the timeout value. The default timeout value for an application is 120 seconds. The maximum value for the application timeout is 900 seconds. The example below shows the application timeout increased to 240 seconds.
"settings": {
"entrypoint": {
"file": "lambda_function",
"function": "lambda_handler"
},
"timeout": 240,
"memory": 128,
"environment": {"LOG_LEVEL": "DEBUG" },
"runtime": "python3.8",
"app": {
"scheduler_type": 1,
"cron_string": "*/5 * * * *"
}
},
Example of app timeout setting in the manifest.json file for a ScheduledNaturalTimeEvent app
Note: Please refer to AWS Lambda's Documentation to learn about lambda function best practices here Best practices for working with AWS Lambda functions.
3.5.5 Optional: Increase the application memory in the manifest.json file
If your app is importing large code libraries, completing memory intensive tasks, or is running much slower than expected, then you may need to increase the memory setting. The default memory value and minimum value for an application is 128 MB. The maximum value for the application memory is 10,240 MB. The example below shows the application timeout increased by 128 MB increments to 640 MB.
"settings": {
"entrypoint": {
"file": "lambda_function",
"function": "lambda_handler"
},
"timeout": 120,
"memory": 640,
"environment": {"LOG_LEVEL": "DEBUG" },
"runtime": "python3.8",
"app": {
"scheduler_type": 1,
"cron_string": "*/5 * * * *"
}
},
Example of app memory setting in the manifest.json file for a ScheduledNaturalTimeEvent app
Note: Please refer to AWS Lambda's Documentation to learn about lambda function best practices here Best practices for working with AWS Lambda functions.
3.6 Implement logic in the lambda_function.py file
Now that the app is configured, you can now implement the logic in the lambda_function.py file.
Note: Implementing the logic in the lambda_function.py file is the most basic way to implement the logic. The user has the option to create directories and use Python libraries like Pydantic.
# 1. Import required functionality.
from corva import Api, Cache, Logger, ScheduledNaturalTimeEvent, scheduled
import statistics
# 2. - Decorate your function using @scheduled. Use the the existing lambda_handler function or define your own function. It must receive three argumets: event, api and cache. The arguments serve as building blocks for your app.
@scheduled
def lambda_handler(event: ScheduledNaturalTimeEvent, api: Api, cache: Cache):
# 3. Here is where you can declare your variables from the argument event: ScheduledNaturalTimeEvent and start using Api, Cache and Logger functionalities.
# The scheduled app can declare the following attributes from the ScheduledNaturalTimeEvent: company_id: The company identifier; asset_id: The asset identifier; schedule_start: The start time of interval; interval: The time between two schedule triggers.
asset_id = event.asset_id
company_id = event.company_id
schedule_start = event.schedule_start
interval = event.interval
schedule_end = schedule_start + interval
# 4. Utilize the attributes from the ScheduledNaturalTimeEvent to make an API request to corva#hydraulics.surge-and-swab or any desired time type dataset.
# You have to fetch the realtime drilling data for the asset based on start and end time of the event.
# schedule_start is inclusive so the query is structured accordingly to avoid processing duplicate data
# We are only querying for mud_density field since that is the only field we need. It is nested under data. We are using the SDK convenience method api.get_dataset. See API Section for more information on convenience method.
records = api.get_dataset(
provider="corva",
dataset= "hydraulics.surge-and-swab",
query={
'asset_id': asset_id,
'timestamp': {
'$gte': schedule_start,
'$lte': schedule_end
}
},
sort={'timestamp': 1},
limit=500,
fields="data.mud_density"
)
record_count = len(records)
# Utilize the Logger functionality. The default log level is Logger.info. To use a different log level, the log level must be specified in the manifest.json file in the "settings.environment": {"LOG_LEVEL": "DEBUG"}. See the Logger documentation for more information.
Logger.debug(f"{record_count=}")
# 5. Implementing some calculations
# Computing mean mud_density value from the list of realtime wits records
mean_mud_density = statistics.mean(record.get("data", {}).get("mud_density", 0) for record in records)
# Utililize the Cache functionality to get a set key value. The Cache functionality is built on Redis Cache. See the Cache documentation for more information.
# Getting last exported timestamp from Cache
last_exported_timestamp = int(cache.get(key='last_exported_timestamp') or 0)
# Making sure we are not processing duplicate data
if schedule_end <= last_exported_timestamp:
Logger.debug(f"Already processed data until {last_exported_timestamp=}")
return None
# 6. This is how to set up a body of a POST request to store the mean mud_density data and the schedule_start and schedule_end of the interval from the event.
output = {
"timestamp": record.timestamp,
"asset_id": asset_id,
"company_id": company_id,
"provider": "big-data-energy",
"collection": "example-scheduled-natural-time-app",
"data": {
"mean_mud_density": mean_mud_density,
"schedule_start": schedule_start,
"schedule_end": schedule_end
},
"version": 1
}
# Utilize the Logger functionality.
Logger.debug(f"{asset_id=} {company_id=}")
Logger.debug(f"{schedule_start=} {schedule_end=} {record_count=}")
Logger.debug(f"{output=}")
# 7. Save the newly calculated data in a custom dataset
# Utilize the Api functionality. The data=outputs needs to be an an array because Corva's data is saved as an array of objects. Objects being records. See the Api documentation for more information.
api.post(
f"api/v1/data/big-data-energy/example-scheduled-natural-time-app/", data=[output],
).raise_for_status()
# Utililize the Cache functionality to set a key value. The Cache functionality is built on Redis Cache. See the Cache documentation for more information. This example is setting the last timestamp of the output to Cache
cache.set(key='last_exported_timestamp', value=output[-1].get("timestamp"))
return output
3.7 Locally test your application
To locally test your Corva app, you need to follow these steps:
Create a
local_run.py
file in your project directory.- This file will contain the code that simulates the environment in which your app will run.
- It uses environment variables for authentication and API access.
Set environment variables on your local machine.
Ensure you have set the appropriate environment variables, as shown in the code below.
Here's an example of how to export environment variables in your terminal:
export API_ROOT_URL="https://api.example.com"
export DATA_API_ROOT_URL="https://data-api.example.com"
export CORVA_API_KEY="your_api_key"
export APP_KEY="your_app_key"You can add these export statements to your shell profile (e.g.,
.bashrc
,.zshrc
) for persistence, or run them directly in your terminal for the current session.
Run the
local_run.py
file.Once you've created the file and set your environment variables, run the script using Python to simulate the app behavior in a local environment.
Example:
python local_run.py
Interpret the output.
- The output of your app will be printed in the terminal. You can use this output to verify the results of your function's execution in a local testing scenario.
Here’s an example of what the local_run.py
file should look like:
from corva import Api, Cache, Logger, ScheduledNaturalTimeEvent
from src.app import example_drilling_scheduler_app
# Main entry point for the script
if __name__ == '__main__':
import os # Import the os module to interact with the environment variables
# Define test values for the asset and company IDs, and the start/end times for the event
asset_id = 82891865 # Example asset ID for testing
company_id = 196 # Example company ID for testing
start_time="1521425191" # Unix timestamp representing the start time
end_time="1521425191" # Unix timestamp representing the end time
schedule_start= 0 # Schedule start time for the event
interval = 1 # Interval for the event
# Create an instance of ScheduledDataTimeEvent to mock the event object
# which would normally be provided during execution in a live environment
_event = ScheduledNaturalTimeEvent(asset_id=asset_id, company_id=company_id, schedule_start=schedule_start, interval=interval, start_time=start_time, end_time=end_time)
# Define a custom BearerApi class that extends the base Api class
# and automatically adds authentication headers (Bearer token and app key) to requests
class BearerApi(Api):
@property
def default_headers(self):
# The default headers to be sent with each API request
return {
'Authorization': f'{self.api_key}', # Bearer token for authentication
'X-Corva-App': self.app_key, # App key header specific to Corva apps
}
# Instantiate the BearerApi class using environment variables for API and app keys
api = BearerApi(
api_url=os.environ['API_ROOT_URL'], # Base URL for the Corva API
data_api_url=os.environ['DATA_API_ROOT_URL'], # Base URL for the data API
api_key=os.environ['CORVA_API_KEY'], # API key for authentication (from environment)
app_key=os.environ['APP_KEY'], # App key (from environment)
)
# Initialize the cache, pointing to a Redis instance running locally on default port 6379
cache = Cache(hash_name='cache_hash_name', redis_dsn='redis://127.0.0.1:6379/0')
# Call the example app function with the mocked event, API instance, and cache object
# The example_drilling_scheduler_app is the main business logic that is being tested here
output = example_drilling_scheduler_app(event=_event, api=api, cache=cache)
# Print the output to verify the result of the function call in the local environment
print(output)
3.8 Deploy your application
Please see Getting Started section 4. Upload and Publish.
3.8.1 App Runner production testing
Please see App Runner section for more information.
3.9 Provision the application
For Company provisioning please see App Provisioning for more information.
For Corva Partners please see App Provisioning with a focus on App Purchases.